// Copyright 2014 The mqrouter Author. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package kafka

import (
	"context"
	"encoding/json"
	"fmt"
	"strings"
	"time"

	"gitee.com/xfrm/middleware/xmq/pub"

	"gitee.com/xfrm/middleware/xlog"
	"gitee.com/xfrm/middleware/xtime"
	"gitee.com/xfrm/middleware/xtrace"
	"github.com/segmentio/kafka-go"

	. "gitee.com/xfrm/middleware/internal/middlewareconf"
)

type KafkaWriter struct {
	*kafka.Writer
	// NOTE: KafkaWriter 没有 config 的 getter，故在此保留一份
	config kafka.WriterConfig
	stop   chan struct{}
}

func NewKafkaWriter(brokers []string, topic string, mqConfig *MQConfig) *KafkaWriter {
	config := kafka.WriterConfig{
		Brokers:   brokers,
		Topic:     topic,
		Balancer:  &kafka.Hash{},
		BatchSize: mqConfig.BatchSize,
		//RequiredAcks: 1,
		Async:        mqConfig.Async,
		Logger:       xlog.GetInfoLogger(),
		ErrorLogger:  getErrorLogger(),
		WriteTimeout: mqConfig.TimeOut,
		BatchTimeout: mqConfig.BatchTimeout,
	}
	writer := kafka.NewWriter(config)
	kafkaWriter := &KafkaWriter{
		Writer: writer,
		config: config,
		stop:   make(chan struct{}),
	}
	go kafkaWriter.init()

	return kafkaWriter
}

func (m *KafkaWriter) setKafkaSpanTags(span xtrace.Span) {
	config := m.config
	span.SetTag(xtrace.TagComponent, pub.TraceComponent)
	span.SetTag(xtrace.TagPalfishMessageBusType, pub.TraceMessageBusTypeKafka)
	span.SetTag(xtrace.TagSpanKind, xtrace.SpanKindProducer)
	span.SetTag(xtrace.TagMessageBusDestination, config.Topic)
	span.SetTag(xtrace.TagMessagingSystem, xtrace.MessagingSystemKafka)
	span.SetTag(xtrace.TagMessagingDestinationKind, xtrace.MessagingDestinationKindTopic)
	span.SetTag(xtrace.TagMessagingDestination, config.Topic)

	if len(config.Brokers) == 0 {
		span.SetTag(xtrace.TagPalfishKafkaConsumerBrokers, strings.Join(config.Brokers, ApolloMQBrokersSep))
	}
}

func (m *KafkaWriter) init() {
	ticker := time.NewTicker(time.Second)
	defer ticker.Stop()
	for {
		select {
		case <-ticker.C:
			stats := m.Stats()
			statKafkaWriter(m.config.Topic, &stats)
		case <-m.stop:
			return
		}
	}
}

func (m *KafkaWriter) WriteMsg(ctx context.Context, k string, v interface{}) error {
	span := xtrace.SpanFromContext(ctx)
	if span != nil {
		m.setKafkaSpanTags(span)
	}

	msg, err := json.Marshal(v)
	if err != nil {
		return err
	}

	st := xtime.NewTimeStat()
	err = m.WriteMessages(ctx, kafka.Message{
		Key:   []byte(k),
		Value: msg,
	})
	pub.StatReqDuration(ctx, m.config.Topic, "KafkaWriter.WriteMessages", pub.TraceMessageBusTypeKafka, st.Millisecond())

	return err
}

func (m *KafkaWriter) WriteMsgs(ctx context.Context, msgs []kafka.Message) error {
	span := xtrace.SpanFromContext(ctx)
	if span != nil {
		m.setKafkaSpanTags(span)
	}

	st := xtime.NewTimeStat()
	err := m.WriteMessages(ctx, msgs...)
	pub.StatReqDuration(ctx, m.config.Topic, "KafkaWriter.WriteMessages", pub.TraceMessageBusTypeKafka, st.Millisecond())

	return err
}

func (m *KafkaWriter) Close() error {
	m.stop <- struct{}{}
	return m.Writer.Close()
}

func NewWriter(ctx context.Context, topic string) (*KafkaWriter, error) {

	middlewareConfig, err := GetMiddlewareConfig()
	if err != nil {
		return nil, err
	}
	config, err := middlewareConfig.GetMQConfig(ctx, topic, MiddlewareTypeKafka)
	if err != nil {
		return nil, err
	}

	mqType := config.MQType
	switch mqType {
	case MiddlewareTypeKafka:
		return NewKafkaWriter(config.MQAddr, pub.WrapTopicFromContext(ctx, topic), config), nil

	default:
		return nil, fmt.Errorf("mqType %d error", mqType)
	}
}
